{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Copyright 2014 Brett Slatkin, Pearson Education Inc.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#     http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "\n",
    "# Preamble to mimick book environment\n",
    "import logging\n",
    "from pprint import pprint\n",
    "from sys import stdout as STDOUT"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 1\n",
    "class InputData(object):\n",
    "    def read(self):\n",
    "        raise NotImplementedError"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 2\n",
    "class PathInputData(InputData):\n",
    "    def __init__(self, path):\n",
    "        super().__init__()\n",
    "        self.path = path\n",
    "\n",
    "    def read(self):\n",
    "        return open(self.path).read()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 3\n",
    "class Worker(object):\n",
    "    def __init__(self, input_data):\n",
    "        self.input_data = input_data\n",
    "        self.result = None\n",
    "\n",
    "    def map(self):\n",
    "        raise NotImplementedError\n",
    "\n",
    "    def reduce(self, other):\n",
    "        raise NotImplementedError"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 4\n",
    "class LineCountWorker(Worker):\n",
    "    def map(self):\n",
    "        data = self.input_data.read()\n",
    "        self.result = data.count('\\n')\n",
    "\n",
    "    def reduce(self, other):\n",
    "        self.result += other.result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 5\n",
    "import os\n",
    "\n",
    "def generate_inputs(data_dir):\n",
    "    for name in os.listdir(data_dir):\n",
    "        yield PathInputData(os.path.join(data_dir, name))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 6\n",
    "def create_workers(input_list):\n",
    "    workers = []\n",
    "    for input_data in input_list:\n",
    "        workers.append(LineCountWorker(input_data))\n",
    "    return workers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 7\n",
    "from threading import Thread\n",
    "\n",
    "def execute(workers):\n",
    "    threads = [Thread(target=w.map) for w in workers]\n",
    "    for thread in threads: thread.start()\n",
    "    for thread in threads: thread.join()\n",
    "\n",
    "    first, rest = workers[0], workers[1:]\n",
    "    for worker in rest:\n",
    "        first.reduce(worker)\n",
    "    return first.result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 8\n",
    "def mapreduce(data_dir):\n",
    "    inputs = generate_inputs(data_dir)\n",
    "    workers = create_workers(inputs)\n",
    "    return execute(workers)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "There are 4826 lines\n"
     ]
    }
   ],
   "source": [
    "# Example 9\n",
    "from tempfile import TemporaryDirectory\n",
    "import random\n",
    "\n",
    "def write_test_files(tmpdir):\n",
    "    for i in range(100):\n",
    "        with open(os.path.join(tmpdir, str(i)), 'w') as f:\n",
    "            f.write('\\n' * random.randint(0, 100))\n",
    "\n",
    "with TemporaryDirectory() as tmpdir:\n",
    "    write_test_files(tmpdir)\n",
    "    result = mapreduce(tmpdir)\n",
    "\n",
    "print('There are', result, 'lines')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 10\n",
    "class GenericInputData(object):\n",
    "    def read(self):\n",
    "        raise NotImplementedError\n",
    "\n",
    "    @classmethod\n",
    "    def generate_inputs(cls, config):\n",
    "        raise NotImplementedError"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 11\n",
    "class PathInputData(GenericInputData):\n",
    "    def __init__(self, path):\n",
    "        super().__init__()\n",
    "        self.path = path\n",
    "\n",
    "    def read(self):\n",
    "        return open(self.path).read()\n",
    "\n",
    "    @classmethod\n",
    "    def generate_inputs(cls, config):\n",
    "        data_dir = config['data_dir']\n",
    "        for name in os.listdir(data_dir):\n",
    "            yield cls(os.path.join(data_dir, name))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 12\n",
    "class GenericWorker(object):\n",
    "    def __init__(self, input_data):\n",
    "        self.input_data = input_data\n",
    "        self.result = None\n",
    "\n",
    "    def map(self):\n",
    "        raise NotImplementedError\n",
    "\n",
    "    def reduce(self, other):\n",
    "        raise NotImplementedError\n",
    "\n",
    "    @classmethod\n",
    "    def create_workers(cls, input_class, config):\n",
    "        workers = []\n",
    "        for input_data in input_class.generate_inputs(config):\n",
    "            workers.append(cls(input_data))\n",
    "        return workers"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 13\n",
    "class LineCountWorker(GenericWorker):\n",
    "    def map(self):\n",
    "        data = self.input_data.read()\n",
    "        self.result = data.count('\\n')\n",
    "\n",
    "    def reduce(self, other):\n",
    "        self.result += other.result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Example 14\n",
    "def mapreduce(worker_class, input_class, config):\n",
    "    workers = worker_class.create_workers(input_class, config)\n",
    "    return execute(workers)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "There are 4567 lines\n"
     ]
    }
   ],
   "source": [
    "# Example 15\n",
    "with TemporaryDirectory() as tmpdir:\n",
    "    write_test_files(tmpdir)\n",
    "    config = {'data_dir': tmpdir}\n",
    "    result = mapreduce(LineCountWorker, PathInputData, config)\n",
    "print('There are', result, 'lines')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
